/**
* Project Name:KettleUtil
* Date:2016年6月28日
* Copyright (c) 2016, jingma All Rights Reserved.
*/

package cn.benma666.kettle.job;

import cn.benma666.kettle.domain.VJob;
import cn.benma666.kettle.loglistener.FileLoggingEventListener;
import cn.benma666.kettle.mytuils.KettleManager;
import org.pentaho.di.job.Job;
import org.pentaho.di.trans.Trans;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

/**
 * 作业具体操作管理 <br/>
 * date: 2016年6月28日 <br/>
 * @author jingma
 * @version 0.1
 */
public class JobManager extends AbsJob {
    /**
     * 更新标志
     */
    private static Date updateFlag = new Date();

    @Override
    protected void process() throws Exception {
        //要重启的作业
        List<VJob> restartList = new ArrayList<>();
//        new ZlbdHmbd().init(null);
        //遍历运行的作业
        synchronized (KettleManager.JobMap) {
            Iterator<Entry<String, VJob>> jobIter = KettleManager.JobMap.entrySet().iterator();
            Date updateFlag1 = updateFlag;
            updateFlag = new Date();
            while(jobIter.hasNext()){
                VJob vJob = jobIter.next().getValue();
                Job job = vJob.getJob();
                String status = KettleManager.getJobStatus(vJob);
                if ((job.getResult()!=null&&!job.isActive())
                        ||Trans.STRING_STOPPED.equals(job.getStatus())
                        ||Trans.STRING_FINISHED_WITH_ERRORS.equals(job.getStatus())
                        ||State.TERMINATED.equals(job.getState())) {
                    // 运行结束
                    VJob jsonjob = KettleManager.removeJob(vJob);
                    jobIter.remove();
                    //异常停止且如果设定的异常自动重启次数大于已经自动重启的次数则执行自动重启操作
                    if(KettleManager.STOP_FAILED.equals(status)&&
                            valByDef(jsonjob.getZdcqcs(),0)>
                            jsonjob.getYcqcs()){
                        restartList.add(jsonjob);
                    }
                    info("该作业已运行结束："+vJob.getName());
                }else{
                    //刷新日志文件
                    FileLoggingEventListener ll = vJob.getFlel();
                    if(ll.getLastupdate().getTime()>updateFlag1.getTime()){
                        if(KettleManager.isWriteLogFile()){
                            ll.getOutputStream().flush();
                            FileLoggingEventListener.checkLogFileSize(vJob);
                        }
                        KettleManager.updateJobStatus(vJob);
                    }
                }
            }
        }
        for(VJob jsonjob:restartList){
            KettleManager.startJob(jsonjob);
            //已重启次数+1
            jsonjob.setYcqcs(jsonjob.getYcqcs()+1);
            info("异常停止，自动重启："+jsonjob);
        }
    }
}
